Skip to content

Refactor caching of data for multilayer algorithms#118

Merged
knoepfel merged 10 commits intoFramework-R-D:mainfrom
knoepfel:fix-caching
Feb 24, 2026
Merged

Refactor caching of data for multilayer algorithms#118
knoepfel merged 10 commits intoFramework-R-D:mainfrom
knoepfel:fix-caching

Conversation

@knoepfel
Copy link
Member

@knoepfel knoepfel commented Nov 14, 2025

Issue #24 describes the need to adjust the caching of data products. The chosen adjustment is described at #24 (comment) and explained here.

Caching performed by the framework graph

The Phlex framework implements caching to ensure that algorithms can operate on data products from correctly-associated data cells. The caching technology is based largely on oneTBB Flow Graph's tbb::flow::join_node<...> template. This join node is used in tag-matching mode, where the input data on the join node's ports are emitted together when the "tags" corresponding to the data across the input ports match.

To ensure that data from different data layers can be matched together, each data product (or, more properly, the product store that contains the data product) is included as part of a message that contains an ID. The message ID (an object of type std::size_t) is inspected by the tbb::flow::join_node<...> to match data from the join node's ports.

With the current implementation of oneTBB's Flow Graph, there is no way to tell a join node to (e.g.) reuse a particular data product from a run for a given set of spills within that run. The same data product must be repeatedly presented to the join node even though there may be many spills for one run. How this problem is solved depends on the implementation and is described below.

Current implementation

A workflow that demonstrates how this works with the present implementation is shown below.

Screenshot 2026-02-18 at 1 24 24 PM

The nodes provide(A), transform(A), and observe(C) should execute only once per r encountered. The provide(B), transform(B), and transform(C, D) nodes should execute once per (r, s) encountered.

In order to ensure that the transform(C, D) can execute once per spill, the data product $c_r$ must be presented to the join node the same number of times (and with the same message ID) as each of the data products $d_{rs}$ coming from transform(B). This is achieved by sending all of the spills to the provide(A) node, reusing any data products already created by that node, and then passing them downstream until the data product $c_r$ can be re-presented to the join node. This is awkward for various reasons:

  1. The nodes provide(A), transform(A), and observe(C) do not conceptually operate on spill-level data products.
  2. All nodes must cache the result of the nodes even though that may not be necessary
  3. Only the $[c_r] \times [n_{rs}]$ data products are matched with $[d_{rs}]$. The join node has another $[c_r]$ data products cached at one of its input ports until the end of the program (there will be no corresponding $[d_r]$ matches).
  4. Some node's caches are flushed prematurely, thus unnecessarily requiring the re-execution a node.

These combined behaviors result in unpredictable results for the user.

With this PR

This PR localizes caching to only the nodes that require multiple inputs.

Screenshot 2026-02-18 at 1 14 44 PM

The benefits to this approach:

  1. The nodes provide(A), transform(A), and observe(C) operate only on run-level data products.
  2. Only nodes requiring multiple data products as input must cache the required data.
  3. There are no unmatched data products at any of the join node's input ports
  4. Caches are flushed only when the node that requires a particular data product no longer needs it

The major changes with this PR

  1. The multiplexer ("router" in earlier diagrams) has been replaced by an index_router class (labeled "Index Router" in the above image)
  2. Explicit "index set" nodes have been added, which are responsible for reacting to data-cell indices belonging to a specific layer (this cleans up an infelicity in how provider nodes were implemented)
  3. Caching has been removed from all nodes except any nodes that require joining two or more arguments:
    1. For nodes with two or more products that belong to the same data layer, a multilayer_join_node is created in such a manner that only the standard tbb::join_node<...> is used to tag-match the inputs
    2. For nodes with two or more products that belong to different data layers, a multilayer_join_node is created with "repeaters", which cache data products from the different layers while emitting them as needed to match the other inputs for the node.
  4. A one-slot std::tuple<message> message type is no longer used for nodes that consume only one data product, thus removing unnecessary complexity and operations in presenting the data to nodes.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Explores introducing a “repeater” concept for multi-layer consumption by refactoring how flush/caching signals are delivered through the flow graph, and adds a dedicated test harness to exercise the approach.

Changes:

  • Added a new test/repeater test suite with an index_router and repeater_node prototype to explore cached product reuse across layers.
  • Refactored flush propagation: flush messages are no longer routed through the multiplexer; they are broadcast via dedicated flusher nodes and consumed through new flush_port() receivers on core node types.
  • Tightened several tests from “>=” to strict “==” execution-count expectations and exposed layer_generator::layer_paths() for routing.

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
test/repeater/repeater_node.hpp Adds prototype repeater composite node with caching/flush-driven replay behavior.
test/repeater/repeater.cpp Adds Catch2 test constructing a multi-layer graph using the repeater exploration nodes.
test/repeater/nodes.hpp Adds provider/consumer and multi-argument consumer test nodes built around repeater nodes.
test/repeater/message_types.hpp Defines test message types and matchers used by the repeater graph.
test/repeater/index_router.hpp Declares index routing helper for broadcasting indices/flush tokens to multi-layer nodes.
test/repeater/index_router.cpp Implements routing/backout logic and multilayer broadcasting.
test/repeater/CMakeLists.txt Builds the repeater test and supporting index_router library.
test/CMakeLists.txt Enables building the new test/repeater subdirectory.
test/hierarchical_nodes.cpp Tightens execution count check for get_the_time.
test/cached_execution.cpp Tightens cached execution checks from >= to ==.
test/allowed_families.cpp Tightens provider execution count checks from >= to ==.
plugins/layer_generator.hpp Exposes layer_paths() accessor for test routing.
phlex/core/store_counters.hpp Adds detect_flush_flag::receive_flush(message const&) declaration to centralize flush handling.
phlex/core/store_counters.cpp Implements receive_flush() and fixes map insertion logic for store_flag.
phlex/core/multiplexer.cpp Stops routing flush messages through the multiplexer (now asserts non-flush only).
phlex/core/message_sender.hpp Switches flush delivery dependency from multiplexer to flusher_t.
phlex/core/message_sender.cpp Sends flush messages via flusher_t broadcast node instead of multiplexer.
phlex/core/fwd.hpp Adds message fwd-decl and introduces flusher_t alias.
phlex/core/framework_graph.hpp Adds a graph-wide flusher_ and wires message_sender to it.
phlex/core/framework_graph.cpp Connects graph-wide and unfold-local flushers to each node’s new flush_port().
phlex/core/declared_unfold.hpp Adds flush_port(), per-unfold flusher(), and tracks child_layer on unfolds.
phlex/core/declared_unfold.cpp Stores child_layer and uses parent-based flush store generation.
phlex/core/declared_transform.hpp Adds flush_port() receiver node; main transform path now asserts non-flush.
phlex/core/declared_provider.hpp Adds flush_port() receiver node; main provider path now asserts non-flush.
phlex/core/declared_predicate.hpp Adds flush_port() receiver node; main predicate path now asserts non-flush.
phlex/core/declared_observer.hpp Adds flush_port() receiver node; main observer path now asserts non-flush.
phlex/core/declared_fold.hpp Adds flush_port() receiver node and removes flush handling from the main fold path.

@knoepfel knoepfel force-pushed the fix-caching branch 2 times, most recently from e62df3f to b232e10 Compare February 3, 2026 23:06
@codecov
Copy link

codecov bot commented Feb 3, 2026

Codecov Report

❌ Patch coverage is 94.63087% with 24 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
phlex/core/index_router.cpp 91.74% 5 Missing and 4 partials ⚠️
phlex/core/multilayer_join_node.hpp 90.19% 2 Missing and 3 partials ⚠️
phlex/core/detail/repeater_node.cpp 95.78% 0 Missing and 4 partials ⚠️
phlex/core/declared_fold.hpp 95.23% 2 Missing ⚠️
phlex/core/declared_provider.cpp 0.00% 1 Missing ⚠️
phlex/core/declared_unfold.cpp 75.00% 1 Missing ⚠️
phlex/core/declared_unfold.hpp 96.00% 1 Missing ⚠️
phlex/core/store_counters.cpp 50.00% 0 Missing and 1 partial ⚠️
@@            Coverage Diff             @@
##             main     #118      +/-   ##
==========================================
+ Coverage   82.24%   83.22%   +0.98%     
==========================================
  Files         127      129       +2     
  Lines        3103     3220     +117     
  Branches      547      546       -1     
==========================================
+ Hits         2552     2680     +128     
+ Misses        334      329       -5     
+ Partials      217      211       -6     
Flag Coverage Δ
unittests 83.22% <94.63%> (+0.98%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
phlex/core/declared_observer.cpp 100.00% <ø> (+66.66%) ⬆️
phlex/core/declared_observer.hpp 100.00% <100.00%> (ø)
phlex/core/declared_output.cpp 100.00% <100.00%> (ø)
phlex/core/declared_predicate.cpp 100.00% <ø> (+50.00%) ⬆️
phlex/core/declared_predicate.hpp 94.73% <100.00%> (+5.08%) ⬆️
phlex/core/declared_provider.hpp 100.00% <100.00%> (ø)
phlex/core/declared_transform.cpp 100.00% <ø> (ø)
phlex/core/declared_transform.hpp 89.47% <100.00%> (-1.84%) ⬇️
phlex/core/edge_maker.cpp 77.27% <100.00%> (ø)
phlex/core/edge_maker.hpp 100.00% <100.00%> (ø)
... and 19 more

... and 3 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 67d1adc...aacdc3f. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@knoepfel knoepfel linked an issue Feb 4, 2026 that may be closed by this pull request
@greenc-FNAL
Copy link
Contributor

Review the full CodeQL report for details.

@knoepfel knoepfel force-pushed the fix-caching branch 5 times, most recently from 6d12aff to ebcd588 Compare February 16, 2026 18:12
@knoepfel knoepfel marked this pull request as ready for review February 16, 2026 18:18
@knoepfel knoepfel changed the title Explore using repeater nodes Refactor caching of data for multilayer algorithms Feb 16, 2026
@knoepfel knoepfel force-pushed the fix-caching branch 3 times, most recently from 1be372f to 8ecc4f8 Compare February 19, 2026 14:51
- Separate flush messages from regular stores
- Consolidate flush reception
- Use flush ports instead of conditional logic
- Remove 'flush' stage
- Introduce multilayer_join_node
- Add repeater_node test
- Add DOT file that depicts caching in data flow
Copy link
Member

@marcpaterno marcpaterno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the comments attached to the code.

@knoepfel knoepfel dismissed marcpaterno’s stale review February 24, 2026 15:37

All comments addressed by commit aacdc3f

@knoepfel knoepfel requested a review from marcpaterno February 24, 2026 15:37
@knoepfel knoepfel merged commit 87fede8 into Framework-R-D:main Feb 24, 2026
57 of 58 checks passed
@knoepfel knoepfel deleted the fix-caching branch February 24, 2026 23:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor caching of data products

4 participants